#!/bin/sh

. "$SPEC_PATH/abstract/backup"

e2e_test_extra_hash() {
  "$SHELL" "$PROJECT_PATH/stackgres-k8s/ci/build/build-functions.sh" path_hash \
    "$(realpath --relative-to "$PROJECT_PATH" "$SPEC_PATH/abstract/backup")"
}

e2e_test_install() {
  CLUSTER_NAME="$(get_sgshardedcluster_name "$SPEC_NAME")"

  install_minio

  create_or_replace_sharded_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE" "3" "2"

  wait_pods_running "$CLUSTER_NAMESPACE" 7
  wait_sharded_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"
  switch_sharded_cluster_to_first "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"

  create_mock_data

  SHARDED_BACKUP_NAME="$(get_sgshardedbackup_name "${CLUSTER_NAME}-$(shuf -i 0-65535 -n 1)")"

  create_sharded_backup "$SHARDED_BACKUP_NAME" false

  wait_sharded_backup_is_completed "$SHARDED_BACKUP_NAME"

  rotate_wal_file_sharded_cluster

  create_more_mock_data

  rotate_wal_file_sharded_cluster
}

e2e_test() {
  run_test "Checking that backup is working" check_backup_is_working

  run_test "Checking that sharded backup are working" check_sharded_backup_is_working

  run_test "Checking that automatic sharded backup are working" check_automatic_sharded_backup_is_working

  run_test "Checking that restoring a sharded backup is working" check_restore_sharded_backup_is_working
}

check_backup_is_working() {
  local BACKUP_NAME
  local SHARDED_CLUSTER_NAME="$CLUSTER_NAME"
  local CLUSTER_NAME
  local NODE=0
  local SUFFIX
  for SUFFIX in "coord" "shard0" "shard1"
  do
    CLUSTER_NAME="$SHARDED_CLUSTER_NAME-$SUFFIX"
    check_wal_archive 0
  done
}

check_sharded_backup_is_working() {
  SHARDED_BACKUP_NAME_0="$(get_sgshardedbackup_name "${CLUSTER_NAME}-$(shuf -i 0-65535 -n 1)")"

  create_sharded_backup "$SHARDED_BACKUP_NAME_0" true

  local BACKUP_NAME
  local SHARDED_CLUSTER_NAME="$CLUSTER_NAME"
  local CLUSTER_NAME
  local NODE=0
  local SUFFIX
  for SUFFIX in coord shard0 shard1
  do
    CLUSTER_NAME="$SHARDED_CLUSTER_NAME-$SUFFIX"
    BACKUP_NAME="$SHARDED_BACKUP_NAME_0-$SUFFIX"
    wait_backup_is_completed "$BACKUP_NAME" "$NODE"
  done

  wait_sharded_backup_is_completed "$SHARDED_BACKUP_NAME_0"

  if kubectl get sgshardedbackup -n "$CLUSTER_NAMESPACE" "$SHARDED_BACKUP_NAME_0" \
    --template='{{ range .status.sgBackups }}{{ . }} {{ end }}' \
    | grep -qFx "$SHARDED_BACKUP_NAME_0-coord $SHARDED_BACKUP_NAME_0-shard0 $SHARDED_BACKUP_NAME_0-shard1 "
  then
    success "Backups are referenced correctly in the sharded backup"
  else
    fail "Backups are not referenced correctly in the sharded backup"
  fi

  CLUSTER_NAME="$SHARDED_CLUSTER_NAME"
  SHARDED_BACKUP_NAME_1="$(get_sgshardedbackup_name "${CLUSTER_NAME}-$(shuf -i 0-65535 -n 1)")"
  SHARDED_BACKUP_NAME_2="$(get_sgshardedbackup_name "${CLUSTER_NAME}-$(shuf -i 0-65535 -n 1)")"
  create_sharded_backup "$SHARDED_BACKUP_NAME_1" true
  wait_sharded_backup_is_completed "$SHARDED_BACKUP_NAME_1"
  create_sharded_backup "$SHARDED_BACKUP_NAME_2" true
  wait_sharded_backup_is_completed "$SHARDED_BACKUP_NAME_2"

  if ! kubectl get sgshardedbackup -n "$CLUSTER_NAMESPACE" "$SHARDED_BACKUP_NAME_0" >/dev/null 2>&1
  then
    success "Sharded backups retention policy is working"
  else
    fail "Sharded backups retention policy is not working"
  fi
}

check_automatic_sharded_backup_is_working() {
  enable_sharded_cluster_cron_schedule

  if wait_until is_automatic_sharded_backup_cr_completed
  then
    success "The full automatic sharded backup is available"
  else
    fail "The full automatic sharded backup is not available"
  fi

  local BACKUP_NAMES

  BACKUP_NAMES="$(kubectl get sgshardedbackup -n "$CLUSTER_NAMESPACE" \
    --template '{{ range .items }}{{ .status.process.status }} {{ .spec.managedLifecycle }} {{ .spec.sgShardedCluster }} {{ range .status.sgBackups }}{{ . }} {{ end }}{{ print "\n" }}{{ end }}' \
    | grep "^Completed true ${CLUSTER_NAME} " | tail -n 1 | cut -d ' ' -f 4-)"

  if [ -z "$BACKUP_NAMES" ]
  then
    fail "Can not retrieve backups associated to the full automatic sharded backup"
  fi

  local BACKUP_NAME
  local SHARDED_CLUSTER_NAME="$CLUSTER_NAME"
  local CLUSTER_NAME
  local NODE=0
  for BACKUP_NAME in $BACKUP_NAMES
  do
    if CLUSTER_NAME="$(kubectl get sgbackup -n "$CLUSTER_NAMESPACE" "$BACKUP_NAME" \
      --template='{{ .status.process.status }} {{ .spec.sgCluster }}' \
      | grep '^Completed ')"
    then
      CLUSTER_NAME="$(printf %s "$CLUSTER_NAME" | cut -d ' ' -f 2)"

      check_timelines

      check_control_data

      success "The full backup $BACKUP_NAME is available"
    else
      fail "The full backup $BACKUP_NAME is not available"
    fi
  done

  CLUSTER_NAME="$SHARDED_CLUSTER_NAME"
  disable_sharded_cluster_cron_schedule
}


create_sharded_backup() {
  cat << EOF | kubectl create -f -
apiVersion: stackgres.io/v1
kind: SGShardedBackup
metadata:
  namespace: "$CLUSTER_NAMESPACE"
  name: "$1"
spec:
  sgShardedCluster: "$CLUSTER_NAME"
  managedLifecycle: $2
EOF
}

wait_sharded_backup_is_completed() {
  local BACKUP_NAME="$1"
  if wait_until -t "$((E2E_TIMEOUT * 3))" eval '[ "$(kubectl get sgshardedbackup -n "$CLUSTER_NAMESPACE" "$BACKUP_NAME" \
    --template "{{ .status.process.status }}" \
    | grep "^Completed$" | wc -l)" -gt 0 ]'
  then
    success "The sharded backup CR has complete"
  else
    fail "The sharded backup CR has failed"
  fi
}

enable_sharded_cluster_cron_schedule() {
  # Sets the value At every minute.
  kubectl patch sgshardedcluster -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME" --type json \
    --patch '[{"op":"replace","path":"/spec/configurations/backups/0/cronSchedule","value":"*/1 * * * *"}]'
}

disable_sharded_cluster_cron_schedule() {
  # Sets the value At 05:00 on day-of-month 31 in February.
  wait_until kubectl patch sgshardedcluster -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME" --type json \
    --patch '[{"op":"replace","path":"/spec/configurations/backups/0/cronSchedule","value":"0 5 31 2 *"}]'
}

is_automatic_sharded_backup_cr_completed() {
  kubectl get sgshardedbackup -n "$CLUSTER_NAMESPACE" \
    --template '{{ range .items }}{{ .status.process.status }} {{ .spec.managedLifecycle }} {{ .spec.sgShardedCluster }}{{ print "\n" }}{{ end }}' \
    | grep -q "^Completed true ${CLUSTER_NAME}$"
}

check_restore_sharded_backup_is_working() {
  SHARDED_BACKUP_NAME_FOR_RESTORE="$(kubectl get sgshardedbackup -n "$CLUSTER_NAMESPACE" \
    --template '{{ range .items }}{{ printf "%s %s\n" .status.process.status .metadata.name }}{{ end }}' \
    | grep "^Completed " | cut -d ' ' -f 2 | grep -vxF "$SHARDED_BACKUP_NAME" | head -n 1)"

  kubectl get secret -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME" -o json \
    | jq "del(.metadata.ownerReferences) | del(.metadata.labels) | .metadata.name = \"$CLUSTER_NAME-back\"" \
    | kubectl create -f -

  remove_sharded_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"

  # Remove this when patroni fix citus integration so that superuser password is changed based on PATRONI_SUPERUSER_PASSWORD
  kubectl get secret -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME-back" -o json \
    | jq ".metadata.name = \"$CLUSTER_NAME\"" \
    | kubectl create -f -

  create_or_replace_sharded_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE" "3" "2" \
    --set-string cluster.initialData.restore.fromBackup.name="$SHARDED_BACKUP_NAME_FOR_RESTORE"

  local RESULT EXIT_CODE
  try_function wait_pods_running "$CLUSTER_NAMESPACE" 7
  if "$RESULT"
  then
    SWITCHOVER_TO_FIRST=true try_function wait_sharded_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"
  fi

  if "$RESULT"
  then
    success "The sharded cluster has been recovered from the sharded backup"
  else
    fail "The sharded cluster has not been recovered from the sharded backup"
  fi

  wait_until check_init_data_after_restore
}

create_mock_data() {
  wait_until run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" -q "SELECT 1"
  wait_until run_query -x "$CLUSTER_NAME-shard0-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" -q "SELECT 1"
  wait_until run_query -x "$CLUSTER_NAME-shard1-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" -q "SELECT 1"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" -q "CREATE TABLE fibonacci(num integer, PRIMARY KEY (num));"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" -q "SELECT ddp_create_vs('public','fibonacci','num',array ['ddp_shard_0','ddp_shard_1']);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" -q "INSERT INTO fibonacci(num) VALUES (1);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" -q "INSERT INTO fibonacci(num) VALUES (2);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" -q "INSERT INTO fibonacci(num) VALUES (3);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" -q "INSERT INTO fibonacci(num) VALUES (1 - 35791402);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" -q "INSERT INTO fibonacci(num) VALUES (2 - 35791402);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" -q "INSERT INTO fibonacci(num) VALUES (3 - 35791402);"
}

create_more_mock_data() {
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" -q "INSERT INTO fibonacci(num) VALUES (5);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" -q "INSERT INTO fibonacci(num) VALUES (8);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" -q "INSERT INTO fibonacci(num) VALUES (13);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" -q "INSERT INTO fibonacci(num) VALUES (5 - 35791402);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" -q "INSERT INTO fibonacci(num) VALUES (8 - 35791402);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" -q "INSERT INTO fibonacci(num) VALUES (13 - 35791402);"
}

rotate_wal_file_sharded_cluster() {
  local POD
  for POD in "$CLUSTER_NAME-coord-0" "$CLUSTER_NAME-shard0-0" "$CLUSTER_NAME-shard1-0"
  do
    run_query -x "$POD" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" \
      -q "SELECT r.file_name from pg_walfile_name_offset(pg_current_wal_lsn()) as r" > "$LOG_PATH/$POD-current-wal-file"
    run_query -x "$POD" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" -q "CHECKPOINT;"
  done
  for POD in "$CLUSTER_NAME-coord-0" "$CLUSTER_NAME-shard0-0" "$CLUSTER_NAME-shard1-0"
  do
    CURRENT_WAL_FILE="$(cat "$LOG_PATH/$POD-current-wal-file")"
    wait_until -t "$((E2E_TIMEOUT / 5))" eval '[ "$(run_query -x "$POD" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "ddp" \
      -q "SELECT r.file_name from pg_walfile_name_offset(pg_switch_wal()) as r")" != "$CURRENT_WAL_FILE" ]'
    wait_until -t "$((E2E_TIMEOUT / 5))" timeout -s KILL "$((E2E_TIMEOUT / 20))" \
      kubectl exec -n "$CLUSTER_NAMESPACE" "$POD" -c patroni -- \
      exec-with-env backup -- wal-g wal-fetch "$CURRENT_WAL_FILE" "/tmp/$CURRENT_WAL_FILE"
  done
}

check_init_data_after_restore() {
  local RESULT
  RESULT="$(run_query -p 5432 -i 1 -h "$CLUSTER_NAME" -c "$CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -d "ddp" \
    -q "SELECT num FROM fibonacci ORDER BY num;")"
  if printf '%s' "$RESULT" \
    | tr -d '\n' \
    | grep -q "^-35791401-35791400-35791399-35791397-35791394-357913891235813$"
  then
    success "restore primary db restored successfully"
  else
    fail "primary db not restored successfully"
  fi
}

check_init_data_after_pitr() {
  local RESULT
  RESULT="$(run_query -p 5432 -i 1 -h "$CLUSTER_NAME" -c "$CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -d "ddp" \
    -q "SELECT num FROM fibonacci ORDER BY num;")"
  if printf '%s' "$RESULT" \
    | tr -d '\n' \
    | grep -q "^-35791401-35791400-35791399123$"
  then
    success "restore primary db restored with PITR successfully"
  else
    fail "primary db not restored with PITR successfully"
  fi
}
